package com.tplhk.rocketmq;

import cn.hutool.json.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.tplhk.vo.Args;
import com.tplhk.vo.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.util.Strings;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;

@Slf4j
//@RocketMQTransactionListener(txProducerGroup = "text-producer")
public class TransactionalMsgListener implements RocketMQLocalTransactionListener {

    /**
     * 开启生产事务后(producer.sync: true), 消息发送到 broker 后，会回调本地事务
     * 根据返回值 COMMIT， ROLLBACK ,  broker 会做出相应的动作。
     * 如果返回值是 UNKNOWN ，则会马上执行 checkLocalTransaction
     *
     * @param message
     * @param o
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            // 获取前面生成的事务ID
            String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
            Args args = null;
            String argStr = message.getHeaders().get("args", String.class);
            if (Strings.isNotBlank(argStr)) {
                args = JSONObject.parseObject(argStr, Args.class);
            }
            // 以事务ID为主键，执行本地事务
            Order order = JSONObject.parseObject((byte[]) message.getPayload(), Order.class);
            log.info("解析入参 transactionId = {}, args = {}, order = {}", transactionId, args, order);

            boolean result = this.saveOrder(order, transactionId);
            log.info("运行本地事务 transactionId = {}, 运行状态 = {}", transactionId, result);
            return result ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    private boolean saveOrder(Order order, String transactionId) {
        // 将事务ID设置为唯一键
        log.info("调用数据库Insert into 订单表 : transactionId = {} ", transactionId);
        return true;
    }

    /**
     * 检查本地事务。
     * 当 executeLocalTransaction 返回 UNKNOWN ，或 broker 60秒未收到响应，就会回调本地事务方法
     *
     * @param message
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        // 获取事务ID
        String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        log.info("检查本地事务 transactionId = {}", transactionId);
        // 以事务ID为主键，查询本地事务执行情况
        if (isSuccess(transactionId)) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }

    private boolean isSuccess(String transactionId) {
        // 查询数据库 select from 订单表
        return true;
    }

}
